-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-33817][SQL] CACHE TABLE uses a logical plan when caching a query to avoid creating a dataframe #30815
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
cc @cloud-fan, thanks! |
| * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because | ||
| * recomputing the in-memory columnar representation of the underlying table is expensive. | ||
| */ | ||
| def cacheQueryWithLogicalPlan( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we just call it cacheQuery to be consistent with uncacheQuery?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, we cannot because Scala doesn't allow overloading methods with default arguments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I think we can do the folllowing:
def cacheQuery(
spark: SparkSession,
planToCache: LogicalPlan,
tableName: Option[String]): Unit = {
cacheQuery(spark, planToCache, tableName, MEMORY_AND_DISK)
}
def cacheQuery(
spark: SparkSession,
planToCache: LogicalPlan,
tableName: Option[String],
storageLevel: StorageLevel)
cloud-fan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
| def cacheQueryWithLogicalPlan( | ||
| spark: SparkSession, | ||
| planToCache: LogicalPlan, | ||
| tableName: Option[String] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this method need default parameter values?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think https://github.com/apache/spark/pull/30815/files#r544822268 should work. Let me update this PR. Thanks!
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #132921 has finished for PR 30815 at commit
|
|
Test build #132934 has finished for PR 30815 at commit
|
|
retest this please |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
cc @sunchao |
| * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because | ||
| * recomputing the in-memory columnar representation of the underlying table is expensive. | ||
| */ | ||
| def cacheQuery( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on this! I think we may replace one more usage in DataSourceV2Strategy.invalidateCache as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, updated.
| spark: SparkSession, | ||
| planToCache: LogicalPlan, | ||
| tableName: Option[String], | ||
| storageLevel: StorageLevel): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps we can just keep a single method with default value of tableName being None and storageLevel being MEMORY_AND_DISK?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The scala compiler will complain if we do that. See #30815 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah got it. Thanks.
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #132959 has finished for PR 30815 at commit
|
|
Test build #132964 has finished for PR 30815 at commit
|
|
thanks, merging to master! |
What changes were proposed in this pull request?
This PR proposes to update
CACHE TABLEto use aLogicalPlanwhen caching a query to avoid creating aDataFrameas suggested here: #30743 (comment)For reference,
UNCACHE TABLEalso usesLogicalPlan:spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
Lines 91 to 98 in 0c12900
Why are the changes needed?
To avoid creating an unnecessary dataframe and make it consistent with
uncacheQueryused inUNCACHE TABLE.Does this PR introduce any user-facing change?
No, just internal changes.
How was this patch tested?
Existing tests since this is an internal refactoring change.